Skip to content

LangGraph 模块使用分析

本文分析当前代码库中 LangGraph 模块的使用情况,并提供后续集成建议。

一、已使用的模块

模块使用位置使用方式
Stateservices/langgraph_chat.py使用 MessagesState 作为状态类型
Nodesservices/langgraph_chat.py定义 call_model 节点处理 LLM 调用
Graphservices/langgraph_chat.py使用 StateGraph 构建工作流
Edgesservices/langgraph_chat.py使用 STARTadd_edge 连接节点
Checkpointerservices/checkpointer.py使用 PySQLSaver (MySQL) 持久化
Streamingservices/langgraph_chat.py使用 stream_mode="messages" 流式输出

代码示例

python
# services/langgraph_chat.py
from langgraph.graph import START, StateGraph, MessagesState

def call_model(state: MessagesState):
    """调用 LLM 生成响应"""
    chain = prompt_template | llm
    response = chain.invoke(state["messages"])
    return {"messages": [response]}

# 构建工作流
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_edge(START, "agent")
app = workflow.compile(checkpointer=checkpointer)

# 流式输出
for chunk in app.stream(..., stream_mode="messages"):
    yield message_chunk.content

当前架构图

┌─────────────────────────────────────────┐
│           当前 LangGraph 架构            │
├─────────────────────────────────────────┤
│                                         │
│   START ──► [agent] ──► END            │
│              │                          │
│              ▼                          │
│        call_model()                     │
│        (LLM 调用)                        │
│              │                          │
│              ▼                          │
│        Checkpointer                     │
│        (MySQL 持久化)                    │
│                                         │
└─────────────────────────────────────────┘

特点:

  • 单节点线性流程
  • 仅支持纯对话(无工具调用)
  • 状态通过 MySQL Checkpointer 持久化
  • 支持 token 级流式输出

二、未使用的模块

2.1 基础构建层

模块说明复杂度
Tools未使用工具调用功能,当前是纯对话 Agent

2.2 执行控制层

模块说明复杂度
Conditional Edges未使用条件边,当前是单节点线性流程
Command未使用命令模式,无法精确控制跳转和状态更新
Recursion未使用递归/循环逻辑

2.3 高级特性层

模块说明复杂度
Store未使用跨会话长期记忆存储
Interrupts未使用中断/人机协作功能
Subgraphs未使用子图模块化
Parallel未使用并行执行
Trimming未使用消息裁剪(长对话可能超出上下文)
Summarization未使用对话摘要

三、适合集成的业务场景

3.1 Tools(工具调用)⭐⭐⭐⭐⭐

优先级:高

为什么需要: 当前 Agent 只能进行纯对话,无法执行实际操作或获取实时信息。

适用场景:

场景描述实现方式
联网搜索让 AI 搜索实时信息集成搜索 API(如 Tavily、SerpAPI)
数据库查询让 AI 查询用户数据、订单信息创建数据库查询工具
API 调用调用外部服务封装 REST API 调用
知识库搜索搜索内部知识库集成向量数据库

示例实现:

python
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """搜索网络获取实时信息"""
    # 调用搜索 API
    results = search_api.search(query)
    return format_results(results)

@tool
def get_user_info(user_id: str) -> dict:
    """获取用户信息"""
    # 查询数据库
    user = db.query(User).filter_by(id=user_id).first()
    return {"name": user.name, "email": user.email}

@tool
def send_notification(user_id: str, message: str) -> str:
    """发送通知给用户"""
    # 调用通知服务
    notification_service.send(user_id, message)
    return "通知已发送"

集成步骤:

  1. 定义工具函数
  2. 将工具绑定到 LLM
  3. 添加工具执行节点
  4. 使用条件边判断是否需要调用工具

3.2 Conditional Edges(条件边)⭐⭐⭐⭐⭐

优先级:高

为什么需要: 配合 Tools 使用,根据 LLM 响应动态决定下一步操作。

适用场景:

场景描述
工具调用判断判断是否需要调用工具
智能路由根据问题类型选择不同处理节点
多模型协作不同任务使用不同模型

示例实现:

python
from typing import Literal

def should_continue(state: MessagesState) -> Literal["tools", END]:
    """判断是否需要调用工具"""
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"  # 有工具调用,执行工具
    return END  # 无工具调用,结束

# 构建带条件边的工作流
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent")  # 工具执行后返回 agent

完整架构图:

┌─────────────────────────────────────────────┐
│        带 Tools 的 Agent 架构                │
├─────────────────────────────────────────────┤
│                                             │
│   START ──► [agent] ──► should_continue()   │
│                │              │              │
│                │              ├──► END       │
│                │              │              │
│                │              └──► [tools]   │
│                │                    │         │
│                └────────────────────┘         │
│                                             │
└─────────────────────────────────────────────┘

3.3 Interrupts(中断/人机协作)⭐⭐⭐⭐

优先级:中高

为什么需要: 某些敏感操作需要人工确认后才能执行,提高系统安全性和可控性。

适用场景:

场景描述示例
敏感操作审批执行前需要人工确认发送邮件、删除数据、执行支付
内容审核AI 生成内容需要确认后发布自动回复、公告发布
信息确认关键信息需要用户确认订单信息、联系方式

示例实现:

python
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver

def send_email_node(state: State):
    """发送邮件节点(带人机确认)"""
    # 中断等待用户确认
    decision = interrupt({
        "action": "send_email",
        "to": state["email_to"],
        "subject": state["email_subject"],
        "body": state["email_body"],
        "message": "确认发送此邮件?"
    })

    if decision:
        # 用户批准,执行发送
        send_email(state["email_to"], state["email_subject"], state["email_body"])
        return {"status": "sent", "email_sent": True}
    return {"status": "cancelled", "email_sent": False}

# 使用
result = graph.invoke({"email_to": "user@example.com", ...})
# 检查是否需要人工确认
if result.get("__interrupts__"):
    # 返回前端,等待用户确认
    return {"need_confirmation": True, "interrupts": result["__interrupts__"]}

# 用户确认后恢复执行
result = graph.invoke(Command(resume=True), config)

前端交互流程:

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│  用户请求  │ ──► │  AI 处理  │ ──► │  中断等待  │ ──► │  用户确认  │
└──────────┘     └──────────┘     └──────────┘     └──────────┘


                                     ┌──────────┐
                                     │  继续执行  │
                                     └──────────┘

3.4 Store(长期记忆)⭐⭐⭐⭐

优先级:中高

为什么需要: 当前 Checkpointer 只能保存单次会话的状态,无法跨会话记住用户偏好。

适用场景:

场景描述
用户偏好记忆记住用户的语言偏好、回答风格偏好
个性化对话根据用户历史调整对话风格
知识积累跨会话积累用户相关信息
上下文增强在新会话中引用之前的对话内容

示例实现:

python
from langgraph.store.memory import InMemoryStore
from langgraph.store.postgres import PostgresStore

# 创建 Store(支持语义搜索)
store = PostgresStore.from_conn_string(DB_URI)
store.setup()

def call_model(state: MessagesState, runtime: Runtime[Context]):
    user_id = runtime.context.user_id
    namespace = ("memories", user_id)

    # 搜索相关记忆
    memories = runtime.store.search(
        namespace,
        query=str(state["messages"][-1].content)
    )

    # 构建记忆上下文
    memory_context = "\n".join([m.value["text"] for m in memories])
    system_prompt = f"用户相关信息:\n{memory_context}\n\n你是一个友好的助手。"

    # 如果用户要求记住某些信息
    if "记住" in state["messages"][-1].content:
        # 提取并存储记忆
        memory = extract_memory(state["messages"][-1].content)
        runtime.store.put(namespace, str(uuid.uuid4()), {"text": memory})

    # 调用 LLM
    response = llm.invoke([
        {"role": "system", "content": system_prompt},
        *state["messages"]
    ])
    return {"messages": [response]}

# 编译时传入 store
graph = builder.compile(checkpointer=checkpointer, store=store)

3.5 Trimming / Summarization(记忆管理)⭐⭐⭐

优先级:中

为什么需要: 当前实现没有处理长对话超出上下文窗口的问题。

适用场景:

场景描述
长对话处理对话超过上下文窗口时自动裁剪
成本优化减少不必要的 token 消耗
关键信息保留用摘要替代旧消息,保留关键信息

Trimming 示例:

python
from langchain_core.messages import trim_messages

def call_model(state: MessagesState):
    # 裁剪消息,保留最近 4000 tokens
    trimmed_messages = trim_messages(
        state["messages"],
        max_tokens=4000,
        strategy="last",
        token_counter=len,  # 或使用更精确的计数器
        start_on="human",
        end_on=("human", "tool"),
    )

    response = llm.invoke(trimmed_messages)
    return {"messages": [response]}

Summarization 示例:

python
class State(TypedDict):
    messages: list
    summary: str

def summarize_if_needed(state: State):
    if len(state["messages"]) > 10:
        # 生成摘要
        summary_prompt = f"总结以下对话:\n{state['messages']}"
        new_summary = llm.invoke(summary_prompt).content

        # 只保留最近 2 条消息 + 摘要
        return {
            "summary": new_summary,
            "messages": state["messages"][-2:]
        }
    return state

3.6 Parallel(并行执行)⭐⭐⭐

优先级:中

为什么需要: 某些场景需要同时执行多个独立任务,提升性能。

适用场景:

场景描述
多模型对比同时调用多个模型,对比结果
多任务处理同时生成文本、图片、摘要
多角度分析从不同角度分析同一问题

示例实现:

python
from operator import add
from typing import Annotated

class ParallelState(TypedDict):
    topic: str
    results: Annotated[list, add]  # 使用 reducer 合并结果

def task_joke(state: ParallelState):
    result = llm.invoke(f"写关于 {state['topic']} 的笑话")
    return {"results": [{"type": "joke", "content": result.content}]}

def task_poem(state: ParallelState):
    result = llm.invoke(f"写关于 {state['topic']} 的诗歌")
    return {"results": [{"type": "poem", "content": result.content}]}

def task_story(state: ParallelState):
    result = llm.invoke(f"写关于 {state['topic']} 的故事")
    return {"results": [{"type": "story", "content": result.content}]}

def aggregator(state: ParallelState):
    combined = f"关于 {state['topic']} 的创作:\n\n"
    for r in state["results"]:
        combined += f"【{r['type']}\n{r['content']}\n\n"
    return {"final_output": combined}

# 构建并行工作流
builder = StateGraph(ParallelState)
builder.add_node("joke", task_joke)
builder.add_node("poem", task_poem)
builder.add_node("story", task_story)
builder.add_node("aggregator", aggregator)

# 并行边:START 同时指向多个节点
builder.add_edge(START, "joke")
builder.add_edge(START, "poem")
builder.add_edge(START, "story")

# 汇聚边:所有节点都指向聚合节点
builder.add_edge("joke", "aggregator")
builder.add_edge("poem", "aggregator")
builder.add_edge("story", "aggregator")
builder.add_edge("aggregator", END)

graph = builder.compile()

架构图:

                    ┌──────────┐
                    │  START   │
                    └────┬─────┘

          ┌──────────────┼──────────────┐
          │              │              │
          ▼              ▼              ▼
     ┌────────┐    ┌────────┐    ┌────────┐
     │  joke  │    │  poem  │    │ story  │
     └───┬────┘    └───┬────┘    └───┬────┘
         │             │             │
         └─────────────┼─────────────┘


                 ┌──────────┐
                 │aggregator│
                 └────┬─────┘


                 ┌──────────┐
                 │   END    │
                 └──────────┘

3.7 Subgraphs(子图)⭐⭐

优先级:低

为什么需要: 将复杂逻辑拆分为可复用的子模块。

适用场景:

场景描述
复杂工作流模块化将大图拆分为可复用子模块
多步骤子流程如"分析->搜索->总结"作为一个整体复用

示例实现:

python
# 子图:搜索流程
class SearchState(TypedDict):
    query: str
    results: list

def search_node(state: SearchState):
    results = search_api(state["query"])
    return {"results": results}

search_builder = StateGraph(SearchState)
search_builder.add_node("search", search_node)
search_builder.add_edge(START, "search")
search_builder.add_edge("search", END)
search_graph = search_builder.compile()

# 父图中调用子图
def call_search(state: ParentState):
    result = search_graph.invoke({"query": state["user_query"]})
    return {"search_results": result["results"]}

四、集成建议优先级

优先级模块业务价值建议场景预估工时
🔴 P0Tools + Conditional Edges扩展 AI 能力联网搜索、数据库查询2-3 天
🟠 P1Interrupts安全可控敏感操作审批1-2 天
🟡 P2Store用户体验个性化对话2-3 天
🟡 P2Trimming成本优化长对话场景0.5 天
🟢 P3Parallel性能提升多模型对比1 天
⚪ P4Subgraphs代码复用复杂工作流2 天

五、推荐实施路径

阶段 1:增强 Agent 能力(P0)

  1. 实现基础 Tools(搜索、查询)
  2. 添加 Conditional Edges 实现工具调用路由
  3. 测试并优化工具调用体验

阶段 2:提升安全性和用户体验(P1-P2)

  1. 实现 Interrupts 用于敏感操作
  2. 集成 Store 实现跨会话记忆
  3. 添加 Trimming 处理长对话

阶段 3: 性能优化和模块化(P3-P4)

  1. 评估是否需要 Parallel 执行
  2. 对复杂流程考虑 Subgraphs 模块化

六、相关文档